package com.free.bsf.demo.rabbitmq;


import com.free.bsf.core.serialize.JsonSerializer;
import com.free.bsf.mq.rabbitmq.*;
import com.free.bsf.mq.base.DelayTimeEnum;
import lombok.val;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * RabbitMQ 测试
 */
@SpringBootApplication
@RestController
public class RabbitmqApplication {

    @Autowired(required = false)
    private RabbitMQProducerProvider rabbitMQProducerProvider;

    public static void main(String[] args) {
        ApplicationContext context = SpringApplication.run(RabbitmqApplication.class, args);
        RabbitMQConsumerProvider consumerProvider = context.getBean(RabbitMQConsumerProvider.class);

        while (true) {
            /***
             *  direct 消费
             */
            consumerProvider.subscribe("bsf.rabbitmq.queue.demo", null, null, (msg) -> {
                if (Objects.nonNull(msg)) {
                    System.out.println("direct Message" + new JsonSerializer().serialize(msg));
                }

            }, String.class);

            /***
             *  topic 消费
             */
            consumerProvider.subscribe("bsf.rabbitmq.topic.queue.demo", null, null, (msg) -> {
                if (Objects.nonNull(msg)) {
                    System.out.println("topic Message" + new JsonSerializer().serialize(msg));
                }

            }, String.class);

            /***
             *  Fanout 消费
             */
            consumerProvider.subscribe("bsf.rabbitmq.Fanout.queue.demo", null, null, (msg) -> {
                if (Objects.nonNull(msg)) {
                    System.out.println("Fanout Message" + new JsonSerializer().serialize(msg));
                }

            }, String.class);


            /***
             *  delayed 消费
             */
            consumerProvider.subscribe("bsf.rabbitmq.queue.delayed.demo", null, null, (msg) -> {
                if (Objects.nonNull(msg)) {
                    System.out.println("delayed Message" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-mm-dd HH:mm:ss")) + new JsonSerializer().serialize(msg));
                }

            }, String.class);

            consumerProvider.subscribe(new RabbitMQSubscribeRunable()
                    .setQueueName("bsf.rabbitmq.Fanout.queue.demo")
                    .setType(String.class)
                    .setRunnable((msg)->{
                        System.out.println("Fanout Message" + new JsonSerializer().serialize(msg));
                    }));
        }
    }

    /**
     * 订阅消息（常用）
     */
    @GetMapping("/sendDirectMessage")
    public void sendMessage() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int j = 0; j < 100; j++) {
            val build = RabbitMQSendMessage.builder().exchange("bsf.rabbitmq.exchange.demo")
                    .exchangeTypeEnum(ExchangeTypeEnum.DIRECT)
                    .enableDeadQueue(false)
                    .delayTimeEnum(DelayTimeEnum.None)
                    .routingKey("bsf.rabbitmq.rk.demo").build()
                    .setQueueName("bsf.rabbitmq.queue.demo")
                    .setMsg("bsf rabbitmq message " + atomicInteger.getAndIncrement());
            rabbitMQProducerProvider.sendMessage(build);
        }
    }

    /**
     * 订阅消息（常用）
     */
    @GetMapping("/sendDelayedMessage")
    public void sendDelayedMessage() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int j = 0; j < 100; j++) {
            val build = new RabbitMQSendMessage<String>()
                    .setExchange("bsf.rabbitmq.exchange.delayed.demo")
                    .setExchangeTypeEnum(ExchangeTypeEnum.DELAYED)
                    .setEnableDeadQueue(false)
                    .setDelayTimeEnum(DelayTimeEnum.S05)
                    .setRoutingKey("bsf.rabbitmq.delayed.rk.demo")
                    .setQueueName("bsf.rabbitmq.queue.delayed.demo")
                    .setMsg(" bsf rabbitmq delayed message " + atomicInteger.getAndIncrement());
            rabbitMQProducerProvider.sendMessage(build);
        }
    }


    /**
     * 主题消息
     */
    @GetMapping("/sendTopicMessage")
    public void sendTopicMessage() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int j = 0; j < 100; j++) {
            val build = new RabbitMQSendMessage<String>()
                    .setExchange("bsf.rabbitmq.exchange.topic.demo")
                    .setExchangeTypeEnum(ExchangeTypeEnum.TOPIC)
                    .setEnableDeadQueue(true)
                    .setRoutingKey("bsf.rabbitmq.rk.topic.demo")
                    .setMsg("bsf rabbitmq sendTopicMessage " + atomicInteger.getAndIncrement())
                    .setQueueName("bsf.rabbitmq.topic.queue.demo");
            rabbitMQProducerProvider.sendMessage(build);
        }

    }

    /**
     * 广播消息
     */
    @GetMapping("/sendFanoutMessage")
    public void sendFanoutMessage() {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int j = 0; j < 100; j++) {
            val build = RabbitMQSendMessage.builder().exchange("bsf.rabbitmq.exchange.Fanout.demo")
                    .exchangeTypeEnum(ExchangeTypeEnum.FANOUT)
                    .enableDeadQueue(true)
                    .routingKey("bsf.rabbitmq.rk.Fanout.demo").build()
                    .setQueueName("bsf.rabbitmq.Fanout.queue.demo")
                    .setMsg("bsf rabbitmq sendFanoutMessage " + atomicInteger.getAndIncrement());
            rabbitMQProducerProvider.sendMessage(build);
        }

    }
}